{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Bring Your Own Pipe-mode Algorithm\n",
    "_**Create a Docker container for training SageMaker algorithms using Pipe-mode**_\n",
    "\n",
    "---\n",
    "\n",
    "---\n",
    "\n",
    "## Contents\n",
    "\n",
    "1. [Overview](#Overview)\n",
    "1. [Preparation](#Preparation)\n",
    "  1. [Permissions](#Permissions)\n",
    "1. [Code](#Code)\n",
    "  1. [train.py](#train.py)\n",
    "  1. [Dockerfile](#Dockerfile)\n",
    "1. [Publish](#Publish)\n",
    "1. [Train](#Train)\n",
    "1. [Conclusion](#Conclusion)\n",
    "\n",
    "\n",
    "---\n",
    "## Overview\n",
    "\n",
    "SageMaker Training supports two different mechanisms with which to transfer training data to a training algorithm: File-mode and Pipe-mode.\n",
    "\n",
    "In File-mode training data is downloaded to an encrypted EBS volume prior to commencing training. Once downloaded, the training algorithm simply trains by reading the downloaded training data files.\n",
    "\n",
    "On the other hand, in Pipe-mode the input data is transferred to the algorithm while it is training. This poses a few significant advantages over File-mode:\n",
    "\n",
    "\n",
    "*  In File-mode, training startup time is proportional to size of the input data. In Pipe-mode, the startup delay is constant, independent of the size of the input data. This translates to much faster training startup for training jobs with large GB/PB-scale training datasets.\n",
    "* You do not need to allocate (and pay for) a large disk volume to be able to download the dataset.\n",
    "* Throughput on IO-bound Pipe-mode algorithms can be multiple times faster than on equivalent File-mode algorithms.\n",
    "\n",
    "However, these advantages come at a cost - a more complicated programming model than simply reading from files on a disk. This notebook aims to clarify what you need to do in order to use Pipe-mode in your custom training algorithm.\n",
    "\n",
    "\n",
    "---\n",
    "## Preparation\n",
    "\n",
    "_This notebook was created and tested on an ml.t2.medium notebook instance._\n",
    "\n",
    "Let's start by specifying:\n",
    "\n",
    "- S3 URIs `s3_training_input` and `s3_model_output` that you want to use for training input and model data respectively.  These should be within the same region as the Notebook Instance, training, and hosting. Since the \"algorithm\" we're building here doesn't really have any specific data-format, feel free to point `s3_training_input` to any s3 dataset you have, the bigger the dataset the better to test the raw IO throughput performance.\n",
    "- The `training_instance_type` to use for training. More powerful instance types have more CPU and bandwidth which would result in higher throughput.\n",
    "- The IAM role arn used to give training access to your data.\n",
    "\n",
    "### Permissions\n",
    "\n",
    "Running this notebook requires permissions in addition to the normal `SageMakerFullAccess` permissions. This is because we'll be creating a new repository in Amazon ECR. The easiest way to add these permissions is simply to add the managed policy `AmazonEC2ContainerRegistryFullAccess` to the role that you used to start your notebook instance. There's no need to restart your notebook instance when you do this, the new permissions will be available immediately."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "isConfigCell": true
   },
   "outputs": [],
   "source": [
    "s3_training_input = 's3://<your_s3_bucket_name_here>/<training_data_prefix>/'\n",
    "s3_model_output = 's3://<your_s3_bucket_name_here/<model_output_prefix>/'\n",
    "# We're using a cheaper instance here, switch to a higher-end ml.c5.18xlarge\n",
    "# to achieve much higher throughput performance:\n",
    "training_instance_type = \"ml.m4.xlarge\"\n",
    "\n",
    "\n",
    "# Define IAM role\n",
    "import boto3\n",
    "import re\n",
    "from sagemaker import get_execution_role\n",
    "from sagemaker.session import Session\n",
    "\n",
    "role = get_execution_role()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "## Code\n",
    "\n",
    "For the purposes of this demo we're going to write an extremely simple “training” algorithm in Python. In essence it will conform to the specifications required by SageMaker Training and will read data in Pipe-mode but will do nothing with the data, simply reading it and throwing it away. We're doing it this way to be able to illustrate only exactly what's needed to support Pipe-mode without complicating the code with a real training algorithm.\n",
    "\n",
    "In Pipe-mode, data is pre-fetched from S3 at high-concurrency and throughput  and streamed into Unix Named Pipes (aka FIFOs) - one FIFO per Channel per epoch. The algorithm must open the FIFO for reading and read through to <EOF> (or optionally abort mid-stream) and close its end of the file descriptor when done. It can then optionally wait for the next epoch's FIFO to get created and commence reading, iterating through epochs until it has achieved its completion criteria.\n",
    "\n",
    "For this example, we'll need two supporting files:\n",
    "\n",
    "### train.py\n",
    "\n",
    "`train.py` simply iterates through 5 epochs on the `training` Channel. Each epoch involves reading the training data stream from a FIFO named `/opt/ml/input/data/training_${epoch}`. At the end of the epoch the code simply iterates to the next epoch, waits for the new epoch's FIFO to get created and continues on.\n",
    "\n",
    "A lot of the code in `train.py` is merely boilerplate code, dealing with printing log messages, trapping termination signals etc. The main code that iterates through reading each epoch's data through its corresponding FIFO is the following:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```python\n",
    "# we're allocating a byte array here to read data into, a real algo\n",
    "# may opt to prefetch the data into a memory buffer and train in\n",
    "# in parallel so that both IO and training happen simultaneously\n",
    "data = bytearray(16777216)\n",
    "total_read = 0\n",
    "total_duration = 0\n",
    "for epoch in range(num_epochs):\n",
    "    check_termination()\n",
    "    epoch_bytes_read = 0\n",
    "    # As per SageMaker Training spec, the FIFO's path will be based on\n",
    "    # the channel name and the current epoch:\n",
    "    fifo_path = '{0}/{1}_{2}'.format(data_dir, channel_name, epoch)\n",
    "\n",
    "    # Usually the fifo will already exist by the time we get here, but\n",
    "    # to be safe we should wait to confirm:\n",
    "    wait_till_fifo_exists(fifo_path)\n",
    "    with open(fifo_path, 'rb', buffering=0) as fifo:\n",
    "        print('opened fifo: %s' % fifo_path)\n",
    "        # Now simply iterate reading from the file until EOF. Again, a\n",
    "        # real algorithm will actually do something with the data\n",
    "        # rather than simply reading and immediately discarding like we\n",
    "        # are doing here\n",
    "        start = time.time()\n",
    "        bytes_read = fifo.readinto(data)\n",
    "        total_read += bytes_read\n",
    "        epoch_bytes_read += bytes_read\n",
    "        while bytes_read > 0 and not terminated:\n",
    "            bytes_read = fifo.readinto(data)\n",
    "            total_read += bytes_read\n",
    "            epoch_bytes_read += bytes_read\n",
    "\n",
    "        duration = time.time() - start\n",
    "        total_duration += duration\n",
    "        epoch_throughput = epoch_bytes_read / duration / 1000000\n",
    "        print('Completed epoch %s; read %s bytes; time: %.2fs, throughput: %.2f MB/s'\n",
    "              % (epoch, epoch_bytes_read, duration, epoch_throughput))\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Dockerfile\n",
    "\n",
    "Smaller containers are preferred for Amazon SageMaker as they lead to faster spin up times in training and endpoint creation, so this container is kept minimal.  It simply starts with Alpine (a minimal Linux install) with python then adds `train.py`, and finally runs `train.py` when the entrypoint is launched.\n",
    "\n",
    "```Dockerfile\n",
    "# use minimal alpine base image as we only need python and nothing else here\n",
    "FROM python:2-alpine3.6\n",
    "\n",
    "MAINTAINER Amazon SageMaker Examples <amazon-sagemaker-examples@amazon.com>\n",
    "\n",
    "COPY train.py /train.py\n",
    "\n",
    "ENTRYPOINT [\"python2.7\", \"-u\", \"/train.py\"]\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "## Publish\n",
    "Now, to publish this container to ECR, we'll run the comands below.\n",
    "\n",
    "This command will take several minutes to run the first time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sh\n",
    "\n",
    "# The name of our algorithm\n",
    "algorithm_name=sagemaker-pipe-demo\n",
    "\n",
    "set -eu # stop if anything fails\n",
    "\n",
    "account=$(aws sts get-caller-identity --query Account --output text)\n",
    "\n",
    "# Get the region defined in the current configuration (default to us-west-2 if none defined)\n",
    "region=$(aws configure get region)\n",
    "region=${region:-us-west-2}\n",
    "\n",
    "fullname=\"${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest\"\n",
    "\n",
    "# If the repository doesn't exist in ECR, create it.\n",
    "\n",
    "aws ecr describe-repositories --repository-names \"${algorithm_name}\" > /dev/null 2>&1\n",
    "\n",
    "if [ $? -ne 0 ]\n",
    "then\n",
    "    aws ecr create-repository --repository-name \"${algorithm_name}\" > /dev/null\n",
    "fi\n",
    "\n",
    "# Get the login command from ECR and execute it directly\n",
    "$(aws ecr get-login --region ${region} --no-include-email)\n",
    "\n",
    "# Build the docker image locally with the image name and then push it to ECR\n",
    "# with the full name.\n",
    "docker build  -t ${algorithm_name} .\n",
    "docker tag ${algorithm_name} ${fullname}\n",
    "\n",
    "docker push ${fullname}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "## Train\n",
    "\n",
    "Now, let's setup the information needed to run the training container in SageMaker.\n",
    "\n",
    "First, we'll get our region and account information so that we can point to the ECR container we just created."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "region = boto3.Session().region_name\n",
    "account = boto3.client('sts').get_caller_identity().get('Account')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "- Specify the role to use\n",
    "- Give the training job a name\n",
    "- Point the algorithm to the container we created\n",
    "- Specify training instance resources\n",
    "- Point to the S3 location of our input data and the `training` channel expected by our algorithm\n",
    "- Point to the S3 location for output\n",
    "- Maximum run time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import json\n",
    "import os\n",
    "\n",
    "pipe_job = 'DEMO-pipe-byo-' + time.strftime(\"%Y-%m-%d-%H-%M-%S\", time.gmtime())\n",
    "\n",
    "print(\"Training job\", pipe_job)\n",
    "\n",
    "training_params = {\n",
    "    \"RoleArn\": role,\n",
    "    \"TrainingJobName\": pipe_job,\n",
    "    \"AlgorithmSpecification\": {\n",
    "        \"TrainingImage\": '{}.dkr.ecr.{}.amazonaws.com/sagemaker-pipe-demo:latest'.format(account, region),\n",
    "        \"TrainingInputMode\": \"Pipe\"\n",
    "    },\n",
    "    \"ResourceConfig\": {\n",
    "        \"InstanceCount\": 1,\n",
    "        \"InstanceType\": \"{}\".format(training_instance_type),\n",
    "        \"VolumeSizeInGB\": 1\n",
    "    },\n",
    "    \"InputDataConfig\": [\n",
    "        {\n",
    "            \"ChannelName\": \"training\",\n",
    "            \"DataSource\": {\n",
    "                \"S3DataSource\": {\n",
    "                    \"S3DataType\": \"S3Prefix\",\n",
    "                    \"S3Uri\": \"{}\".format(s3_training_input),\n",
    "                    \"S3DataDistributionType\": \"FullyReplicated\"\n",
    "                }\n",
    "            },\n",
    "            \"CompressionType\": \"None\",\n",
    "            \"RecordWrapperType\": \"None\"\n",
    "        }\n",
    "    ],\n",
    "    \"OutputDataConfig\": {\n",
    "        \"S3OutputPath\": \"{}\".format(s3_model_output)\n",
    "    },\n",
    "    \"StoppingCondition\": {\n",
    "        \"MaxRuntimeInSeconds\": 60 * 60\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's kick off our training job on Amazon SageMaker Training using the parameters we just created.  Because training is managed (AWS takes care of spinning up and spinning down the hardware), we don't have to wait for our job to finish to continue, but for this case, let's setup a waiter so we can monitor the status of our training."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "sm_session = Session()\n",
    "sm = boto3.client('sagemaker')\n",
    "sm.create_training_job(**training_params)\n",
    "\n",
    "status = sm.describe_training_job(TrainingJobName=pipe_job)['TrainingJobStatus']\n",
    "print(status)\n",
    "sm_session.logs_for_job(job_name=pipe_job, wait=True)\n",
    "sm.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=pipe_job)\n",
    "status = sm.describe_training_job(TrainingJobName=pipe_job)['TrainingJobStatus']\n",
    "print(\"Training job ended with status: \" + status)\n",
    "if status == 'Failed':\n",
    "    message = sm.describe_training_job(TrainingJobName=pipe_job)['FailureReason']\n",
    "    print('Training failed with the following error: {}'.format(message))\n",
    "    raise Exception('Training job failed')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note the throughput logged by the training logs above. By way of comparison a File-mode algorithm will achieve at most  approximately 150MB/s on a high-end `ml.c5.18xlarge` and approximately 75MB/s on a `ml.m4.xlarge`.\n",
    "\n",
    "---\n",
    "## Conclusion\n",
    "There are a few situations where Pipe-mode may not be the optimum choice for training in which case you should stick to using File-mode:\n",
    "\n",
    "* If your algorithm needs to backtrack or skip ahead within an epoch. This is simply not possible in Pipe-mode since the underlying FIFO cannot not support `lseek()` operations.\n",
    "* If your training dataset is small enough to fit in memory and you need to run multiple epochs. In this case may be quicker and easier just to load it all into memory and iterate.\n",
    "* Your training dataset is not easily parse-able from a streaming source.\n",
    "\n",
    "In all other scenarios, if you have an IO-bound training algorithm, switching to Pipe-mode may give you a significant throughput-boost and will reduce the size of the disk volume required. This should result in both saving you time and reducing training costs.\n",
    "\n",
    "You can read more about building your own training algorithms in the [SageMaker Training documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html)."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python2",
   "language": "python",
   "name": "conda_python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.15"
  },
  "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.  Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
